OpenTelemetry Colloctor Receiver編
本日はReceiverの実装を見ます。
Contribに様々なReceiverがありますが、ここでは基本となるotlp receiverについてみていきます。
OTLP Receiver
otlpreceiverはOpenTelemetry CollectorのコアReceiverで、OTLPでデータを受信するための実装です。gRPC と HTTP/JSON の両方をサポートしており、Traces、Metrics、Logs、Profiles の4つのシグナルタイプに対応しています。
gRPC/HTTP サーバーの分離: 両方のプロトコルを同時にサポート
Consumer パターン: 各シグナルタイプのConsumerを保持し、パイプラインの次のステージへデータを渡す
ObsReport: gRPC/HTTPそれぞれのObservabilityレポートを個別管理
code:go
type otlpReceiver struct {
cfg *Config
serverGRPC *grpc.Server
serverHTTP *http.Server
nextTraces consumer.Traces
nextMetrics consumer.Metrics
nextLogs consumer.Logs
nextProfiles xconsumer.Profiles
shutdownWG sync.WaitGroup
obsrepGRPC *receiverhelper.ObsReport
obsrepHTTP *receiverhelper.ObsReport
settings *receiver.Settings
}
sharedcomponentというものがあり、複数のシグナルタイプ(Traces、Metrics、Logs)が同じ設定を持つ場合、同一のReceiver インスタンスを共有できるようになっています。
code:go
// This is the map of already created OTLP receivers for particular configurations.
// We maintain this map because the receiver.Factory is asked trace and metric receivers separately
// when it gets CreateTraces() and CreateMetrics() but they must not
// create separate objects, they must use one otlpReceiver object per configuration.
// When the receiver is shutdown it should be removed from this map so the same configuration
// can be recreated successfully.
FactoryがCreateTracesやCreateMetricsを呼び出すと、LoadOrStoreで既存のインスタンスを返すか、新規作成するような感じになっていそうです。
code:go
// createTraces creates a trace receiver based on provided config.
func createTraces(
_ context.Context,
set receiver.Settings,
cfg component.Config,
nextConsumer consumer.Traces,
) (receiver.Traces, error) {
oCfg := cfg.(*Config)
r, err := receivers.LoadOrStore(
oCfg,
func() (*otlpReceiver, error) {
return newOtlpReceiver(oCfg, &set)
},
)
if err != nil {
return nil, err
}
r.Unwrap().registerTraceConsumer(nextConsumer)
return r, nil
}
データを受け取ったあとは全体の流れとしては下記のような流れになります。
code:md
│
▼
┌─────────────────────────────────────────────────────────┐
│ otlpReceiver │
│ │
│ ┌────────────────┐ ┌────────────────────────────┐│
│ │ gRPC/HTTP │ │ encoder (proto/json) ││
│ │ Deserialize │────►│ unmarshalTracesRequest() ││
│ └────────────────┘ └────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ ptraceotlp.ExportRequest │ │
│ │ ├── orig: *ExportTrace... │ │
│ │ └── state: *State │ │
│ └──────────────────────────────┘ │
│ │ │
│ │ .Traces() │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ ptrace.Traces │ │
│ │ (internal.TracesWrapper) │ │
│ └──────────────────────────────┘ │
│ │ │
│ │ nextConsumer. │
│ │ ConsumeTraces() │
│ ▼ │
└────────────────────────────────────│────────────────────┘
│
┌──────────────────────┼──────────────────────┐
│ │ │
▼ ▼ ▼
Processor Processor Exporter
(mutating) (readonly)
内部で使われているメソッドに関してはprotoから自動生成されているものが大半です。
Consumer
1つのReceiverからProcessorへデータを流す際はファンアウトのConsumerを通ります。
code:go
type tracesConsumer struct {
mutable []consumer.Traces
readonly []consumer.Traces
}
func (tsc *tracesConsumer) Capabilities() consumer.Capabilities {
// If all consumers are mutating, then the original data will be passed to one of them.
return consumer.Capabilities{MutatesData: len(tsc.mutable) > 0 && len(tsc.readonly) == 0}
}
内部的にMutatesData/ReadOnlyのフラグでCunsumer内部におけるデータ処理の方法が定義されておりし、後続のProcessorへどのような経路を通るか判定されています。
code:md
Receiver
│
│ │
│ └─── return processor ← ラップせずそのまま返す
│
▼
Processor (直接)
Receiver
│
│ │
│ └─── return &tracesConsumer{...} ← FanoutConsumer を返す
│
▼
tracesConsumer (FanoutConsumer)
│
├───► Processor1
│
└───► Processor2
Mutable Consumers への配信が行われた際は複数のConsumerが存在する場合に、Cloneが行われます。
code:go
if len(tsc.mutable) > 0 {
// Clone the data before sending to all mutating consumers except the last one.
for i := 0; i < len(tsc.mutable)-1; i++ {
errs = multierr.Append(errs, tsc.mutablei.ConsumeTraces(ctx, cloneTraces(td))) }
// Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the
// data is mutable. Never share the same data between a mutating and a non-mutating consumer since the
// non-mutating consumer may process data async and the mutating consumer may change the data before that.
if len(tsc.readonly) == 0 && !td.IsReadOnly() {
errs = multierr.Append(errs, lastConsumer.ConsumeTraces(ctx, td))
} else {
errs = multierr.Append(errs, lastConsumer.ConsumeTraces(ctx, cloneTraces(td)))
}
}
ObsReport
Collector自身の状態を監視するための仕組みがあります。internal telemetryで表されCollectorが正常にデータを処理できているかを観測することができます。
code:go
// end span according to errors
if span.IsRecording() {
var acceptedItemsKey, refusedItemsKey, failedItemsKey string
switch signal {
case pipeline.SignalTraces:
acceptedItemsKey = internal.AcceptedSpansKey
refusedItemsKey = internal.RefusedSpansKey
failedItemsKey = internal.FailedSpansKey
case pipeline.SignalMetrics:
acceptedItemsKey = internal.AcceptedMetricPointsKey
refusedItemsKey = internal.RefusedMetricPointsKey
failedItemsKey = internal.FailedMetricPointsKey
case pipeline.SignalLogs:
acceptedItemsKey = internal.AcceptedLogRecordsKey
refusedItemsKey = internal.RefusedLogRecordsKey
failedItemsKey = internal.FailedLogRecordsKey
}
span.SetAttributes(
attribute.String(internal.FormatKey, format),
attribute.Int64(acceptedItemsKey, int64(numAccepted)),
attribute.Int64(refusedItemsKey, int64(numRefused)),
attribute.Int64(failedItemsKey, int64(numFailedErrors)),
)
if err != nil {
span.SetStatus(codes.Error, err.Error())
}
}
まとめ
Receiver実装を見ました。明日はProcessorを見ます。